package ink.tsg.flink.leftJoin;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @author tsg
 * @version 1.0
 * @description: TODO
 * @date 2022/5/17 17:10
 */
public class TestWindowJoin {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //左流
        DataStreamSource<SensorRecord> leftSource = env.fromElements(
                new SensorRecord("1",20.5d, LocalDateTime.parse("2021-01-02 10:00:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("2",21.5d,LocalDateTime.parse("2021-01-02 10:00:10", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("3",22.5d,LocalDateTime.parse("2021-01-02 10:00:20", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("4",23.5d,LocalDateTime.parse("2021-01-02 10:00:30", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("5",24.5d,LocalDateTime.parse("2021-01-02 10:00:40", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("6",25.5d,LocalDateTime.parse("2021-01-02 10:00:50", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("7",26.5d,LocalDateTime.parse("2021-01-02 10:01:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("8",27.5d,LocalDateTime.parse("2021-01-02 10:01:10", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
        );
        //右流
        DataStreamSource<SensorRecord> rightSource = env.fromElements(
                new SensorRecord("1",20.5d,LocalDateTime.parse("2021-01-02 10:00:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("2",21.5d,LocalDateTime.parse("2021-01-02 10:00:10", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("3",22.5d,LocalDateTime.parse("2021-01-02 10:00:20", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("4",23.5d,LocalDateTime.parse("2021-01-02 10:00:30", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("5",24.5d,LocalDateTime.parse("2021-01-02 10:00:40", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("6",25.5d,LocalDateTime.parse("2021-01-02 10:00:50", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("7",26.5d,LocalDateTime.parse("2021-01-02 10:01:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))),
                new SensorRecord("8",27.5d,LocalDateTime.parse("2021-01-02 10:01:10", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
        );

        //设置事件事件字段和水位线
        WatermarkStrategy<SensorRecord> strategy = WatermarkStrategy.<SensorRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner(new SerializableTimestampAssigner<SensorRecord>() {
                    @Override
                    public long extractTimestamp(SensorRecord element, long recordTimestamp) {
                        return element.getTimeEpochMilli();
                    }
                });

        DataStream<SensorRecord> leftStream = leftSource
                .filter(a -> a != null)
                .assignTimestampsAndWatermarks(strategy);

        DataStream<SensorRecord> rightStream = rightSource
                .filter(a -> a != null)
                .assignTimestampsAndWatermarks(strategy);


        DataStream<SensorRecordJoin> dataStream = leftStream
                .join(rightStream)
                .where(new MyKeySelector())
                .equalTo(new MyKeySelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(30)))
                .apply(new MyJoinFunction());
        dataStream.print();
        env.execute();

    }

    public static class MyKeySelector  implements KeySelector<SensorRecord, String> {

        @Override
        public String getKey(SensorRecord value) throws Exception {
            return value.getId();
        }

    }

    public static class MyJoinFunction implements JoinFunction<SensorRecord, SensorRecord, SensorRecordJoin> {

        @Override
        public SensorRecordJoin join(SensorRecord first, SensorRecord second) throws Exception {
            return new SensorRecordJoin(first, second);
        }
    }
}
